package org.example.table;

import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSONObject;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.sql.*;
import org.example.domin.Result;
import org.example.domin.TobjLdHour;
import org.example.domin.TobjLdHourVa;
import org.example.utils.JDBCUtils;
import scala.Tuple2;
import scala.Tuple3;
import scala.Tuple4;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.*;
import java.util.stream.Collectors;

import static org.example.concost.Concost.*;
import static org.example.utils.TimeConversion.convertUToTime;

/**
 * @author lwc
 * @description: TODO
 * @date 2023/12/21 20:41
 */
@Slf4j
public class UAICurveDws {

    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("e_mp_u_curveTable");
        SparkSession spark = null;
        if (args.length >= 1) {
            spark = SparkSession.builder()
                    .appName("HBase to MySQL Sync")
                    .config(sparkConf)
//                    .config("spark.master", "local[*]")  // 使用本地模式，[*]表示使用所有可用的核心
                    .enableHiveSupport()
                    .getOrCreate();
            log.info("加上了local[*]");
        } else {
            spark = SparkSession.builder()
                    .appName("HBase to MySQL Sync")
                    .config(sparkConf)
                    .config("spark.master", "local[*]")  // 使用本地模式，[*]表示使用所有可用的核心
                    .enableHiveSupport()
                    .getOrCreate();
        }
        Data(spark, sparkConf);
        spark.stop();
    }


    public static void Data(SparkSession spark, SparkConf sparkConf) {
        log.info("=================开始查询E_MP_U_CURVE数据===================");
        String startDate = DateUtil.beginOfDay(DateUtil.date()).toString("yyyyMMdd");
        String endDate = DateUtil.offsetDay(DateUtil.parse(DateUtil.beginOfDay(DateUtil.date()).toDateStr()), -1).toString("yyyyMMdd");

        String table = sparkConf.get("spark.app.table", "e_mp_u_curve");
        String startTime = sparkConf.get("spark.app.startTime", endDate);
        String endTime = sparkConf.get("spark.app.endTime", startDate);
        String dwsMeter = sparkConf.get("spark.app.dwsMeter", "t_bus_meterpointmng");
        String dwsTable = sparkConf.get("spark.app.dwsTable", "t_obj_202312ld_hour_va");
        String url = sparkConf.get("spark.app.url", "jdbc:mysql://localhost:3306/test");
        String username = sparkConf.get("spark.app.username", "root");
        String password = sparkConf.get("spark.app.password", "9XME3z94xs9nhCj");

        log.info("mysql 连接的参数连接：{},表名:{},用户:{}，密码：{}", url, dwsMeter, username, password);
        log.info("hive 参数，表名：{},开始时间：{},结束时间：{}", table, startDate, endDate);

        Dataset<Row> databases = spark.sql("show databases");
        databases.show();
        try {
            //查询mysql meter
            Dataset<Row> mysqlData = spark.read()
                    .format("jdbc")
                    .option("url", url)
                    .option("dbtable", "(select dev_id from " + dwsMeter + ") as temp")
                    .option("user", username)
                    .option("password", password)
                    .load();
            mysqlData.show();

            //查询hive
            StringBuilder hiveSql = new StringBuilder();
            hiveSql.append("select * from ods_amr20_hbase.").append(table).append(" where ds >= '")
                    .append(startTime).append("'").append(" and ds <='").append(endTime).append("'");
            log.info("执行的hiveSql为：{}",hiveSql);
            Dataset<Row> hiveData = spark.sql(hiveSql.toString());
            //两个表进行join  操作
            Dataset<Row> rowDataset = mysqlData.join(hiveData, mysqlData.col("dev_id").equalTo(hiveData.col("dev_id")));

            JavaRDD<TobjLdHourVa> tobjLdHourVaJavaRDD = rowDataset.javaRDD().flatMap((FlatMapFunction<Row, TobjLdHourVa>)
                    row -> convertRowToDevTimeData(row,table).iterator());
            //将数据进行合并
            JavaPairRDD<Tuple4<Long, String,String,String>, Tuple3<Double, Double, Double>> pairedRDD = tobjLdHourVaJavaRDD.mapToPair(data -> {
                // 创建键值对 (key, value)，其中 key 为 (dev_id, time)，value 为 Tuple3<data1, data2, data3>
                Tuple4<Long, String, String, String> key = new Tuple4<>(data.getF_measurement_points(),
                        data.getF_data_collection_time(), data.getF_key_name(), data.getF_data_input_time());
                Tuple3<Double, Double, Double> value = new Tuple3<>(data.getData_va(), data.getData_vb(), data.getData_vc());
                return new Tuple2<>(key, value);
            });
            log.info("合并完成");
            // 对相同的键进行合并处理
            JavaPairRDD<Tuple4<Long, String, String, String>, Tuple3<Double, Double, Double>> mergedRDD = pairedRDD.reduceByKey(new
            Function2<Tuple3<Double, Double, Double>, Tuple3<Double, Double, Double>, Tuple3<Double, Double, Double>>() {
                @Override
                public Tuple3<Double, Double, Double> call(Tuple3<Double, Double, Double> v1, Tuple3<Double, Double, Double> v2) throws Exception {
                    // 对相同键的两个值进行合并，这里假设数据类型是 Integer 类型，根据实际情况调整
                    Double mergedData1 = (v1._1() != null) ? v1._1() : v2._1();
                    Double mergedData2 = (v1._2() != null) ? v1._2() : v2._2();
                    Double mergedData3 = (v1._3() != null) ? v1._3() : v2._3();
                    return new Tuple3<>(mergedData1, mergedData2, mergedData3);
                }
            });
            // 最终得到合并后的 JavaRDD
            JavaRDD<TobjLdHourVa> resultRDD = mergedRDD.map(data -> {
                TobjLdHourVa resultData = new TobjLdHourVa();
                resultData.setF_measurement_points(data._1()._1());
                resultData.setF_data_collection_time(data._1()._2());
                resultData.setF_key_name(data._1._3());
                resultData.setF_data_input_time(data._1._4());
                resultData.setF_delete(0L);
                resultData.setData_va(data._2()._1());
                resultData.setData_vb(data._2()._2());
                resultData.setData_vc(data._2()._3());
                resultData.setF_data_input_time(DateUtil.date().toDateStr());
//                resultData.setF_key_name(data._1.get);
                return resultData;
            });
            log.info("处理完成");
            Dataset<TobjLdHourVa> dataset = spark.createDataset(resultRDD.rdd(), Encoders.bean(TobjLdHourVa.class));

            dataset.show();
            log.info("保存前进行删除操作，");
            Connection connection = JDBCUtils.getConnection(url, username, password);
            StringBuilder deleteSql = new StringBuilder("delete from " + dwsTable + " where f_data_collection_time <='" + startDate + "' and f_data_collection_time >='" + endDate + "'");
            if(table.equals("e_mp_u_curve")){
                deleteSql.append(" and f_key_name in ('10128E15')");
            } else if (table.equals("e_mp_i_curve")) {
                deleteSql.append(" and f_key_name in ('10128E25')");
            }
            log.info("删除的语句为：{}",deleteSql);
            boolean execute = connection.createStatement().execute(deleteSql.toString());
            log.info("删除结果为：{}",execute);
            dataset.write()
                    .format("jdbc")
                    .option("url", url)
                    .option("dbtable", dwsTable)
                    .option("user", username)
                    .option("password", password)
                    .mode(SaveMode.Append)
                    .save();



//            HashMap<String, HashMap<String, TobjLdHourVa>> tobjHourVa = new HashMap<>();
//            log.info("进行collect");
//            List<TobjLdHourVa> tobjLdHourVas = tobjLdHourVaJavaRDD.collect();
//            log.info("进行解析");
//            tobjLdHourVas.forEach(data -> {
//
//                HashMap<String, TobjLdHourVa> hourVaHashMap = null;
//                if (tobjHourVa.containsKey(data.getF_measurement_points().toString())) {
//                    hourVaHashMap = tobjHourVa.get(data.getF_measurement_points().toString());
//                } else {
//                    hourVaHashMap = new HashMap<>();
//                    tobjHourVa.put(data.getF_measurement_points().toString(), hourVaHashMap);
//                }
//                TobjLdHourVa tobjLdHourVa = data;
//                if (hourVaHashMap.containsKey(tobjLdHourVa.getF_data_collection_time())) {
//                    TobjLdHourVa ldHourVa = hourVaHashMap.get(tobjLdHourVa.getF_data_collection_time());
//                    if (ObjectUtil.isNotNull(tobjLdHourVa.getData_va())) {
//                        ldHourVa.setData_va(tobjLdHourVa.getData_va());
//                    }
//                    if (ObjectUtil.isNotNull(tobjLdHourVa.getData_vb())) {
//                        ldHourVa.setData_vb(tobjLdHourVa.getData_vb());
//                    }
//                    if (ObjectUtil.isNotNull(tobjLdHourVa.getData_vc())) {
//                        ldHourVa.setData_vc(tobjLdHourVa.getData_vc());
//                    }
//                    hourVaHashMap.put(tobjLdHourVa.getF_data_collection_time(), ldHourVa);
////                    tobjHourVa.put()
//                } else {
//                    hourVaHashMap.put(tobjLdHourVa.getF_data_collection_time(), tobjLdHourVa);
//                }
////                    log.info(data.toString());
//            });
//            log.info("进行解析完成");
//            List<TobjLdHourVa> devTimeDataList = new ArrayList<>();
//            tobjHourVa.values().forEach(innerMap -> devTimeDataList.addAll(innerMap.values()));
//            Encoder<TobjLdHourVa> devTimeDataEncoder = Encoders.bean(TobjLdHourVa.class);
//            Dataset<Row> devTimeDataDF = spark.createDataset(devTimeDataList, devTimeDataEncoder).toDF();
//
//
//            devTimeDataDF.show();
//            devTimeDataDF.write().format("jdbc")
//                    .option("url", url)
//                    .option("dbtable", dwsTable)
//                    .option("user", username)
//                    .option("password", password)
//                    .mode(SaveMode.Append)
//                    .save();
        } catch (Exception e) {
            e.printStackTrace();
            log.error("出现错误：{}", e.getMessage());
        }

        log.info("=================开始查询E_MP_U_CURVE 结束===================");
    }


    /**
     * 组装数据
     *
     * @param row
     * @param dwsTable
     * @return
     */
    public static List<TobjLdHourVa> convertRowToDevTimeData(Row row, String dwsTable) {
        List<TobjLdHourVa> devTimeDataList = new ArrayList<>();
        Result result = tableGetResult(row, dwsTable);

        String string = getRowObject(row, "dev_id").toString();

        Object dataDate = getRowObject(row, "ds");
        Object phaseFlag = getRowObject(row, "phase_flag");
        // 从 u0000 到 u2359 的字段
        for (int i = 0; i <= 2359; i++) {
            String columnName = result.getType() + String.format("%04d", i);
            Object dataDouble = getRowObject(row, columnName);

//            devTimeData.addData(columnName, value);
            if (ObjectUtil.isNotNull(dataDouble)) {
                TobjLdHourVa devTimeData = new TobjLdHourVa();
                devTimeData.setF_measurement_points(Long.valueOf(string));
                devTimeData.setF_data_input_time(DateUtil.date().toDateStr());
                tobjHourVaSetAvlite(phaseFlag, devTimeData, dataDouble);
                String parseDate = DateUtil.parse(dataDate.toString(), "yyyyMMdd").toDateStr() + " " + convertUToTime(columnName);
                devTimeData.setF_data_collection_time(parseDate);
                devTimeData.setF_delete(0L);
                devTimeData.setF_key_name(result.getKeyName());
                devTimeDataList.add(devTimeData);
            }

        }

        return devTimeDataList;
    }

    /**
     * 循环插入数据，直到数据插入完毕
     *
     * @param count
     * @param spark
     * @param sparkConf
     * @param table
     * @param startDate
     * @param endDate
     * @param meterTable
     * @param statement
     * @param connection
     * @param dwsTable
     * @param url
     * @param password
     * @param username
     */
    @SneakyThrows
    private static void whileInsert(long count, SparkSession spark, SparkConf sparkConf, String table, String startDate, String endDate, String meterTable, Statement statement, Connection connection, String dwsTable, String url, String password, String username) {
        log.info("总共的数据为：{}", count);
        long pageNum = 0;
        Long pageSize = sparkConf.getLong("spark.app.pageSize", 1);
        log.info("接收到的pageSize为：{}", pageSize);
        do {
            StringBuilder meterSql = new StringBuilder("select * from ").append(meterTable).append("  limit ").append(pageNum).append(" , ").append(pageSize);
            log.info("查询 dws meterPoint的语句为：{}", meterSql);
            ResultSet resultSet = statement.executeQuery(meterSql.toString());
            List<JSONObject> jsonObjectList = convertResultSetToList(resultSet);
            if (!jsonObjectList.isEmpty()) {
                String ids = jsonObjectList.stream().map(json -> json.getString("dev_id")).collect(Collectors.joining(","));
                String sql = String.format("SELECT * FROM  ods_amr20_hbase.%s where ds >= '%s' and ds < '%s' and dev_id in (%s) order by ds desc",
                        table, startDate, endDate, ids);
//                String sql = String.format("SELECT * FROM  ods_amr20_hbase.%s where  dev_id in (%s) and ds = %s",
//                        table, ids,"");
                log.info("分页查询的语句为：{}", sql);
                Dataset<Row> sqlData = spark.sql(sql);
                long pageCount = sqlData.count();
                log.info("分页查询的数量：{}", pageCount);
                try {
                    if (pageCount > 0) {
                        List<Row> rowList = sqlData.collectAsList();
                        Map<String, List<Row>> listMap = rowList.stream().collect(Collectors.groupingBy(map -> map.getAs("dev_id")));
                        Iterator<Map.Entry<String, List<Row>>> entryIterator = listMap.entrySet().stream().iterator();
                        Dataset<Row> personDataset = null;
                        if (table.equals(EMPICURVE) || table.equals(EMPUCURVE)) {
                            HashMap<Long, HashMap<String, TobjLdHourVa>> tobjLdHourVaHashMap = new HashMap<>();
                            while (entryIterator.hasNext()) {
                                Map.Entry<String, List<Row>> next = entryIterator.next();
                                List<Row> value = next.getValue();
                                value.forEach(row -> convertResultVa(table, row, tobjLdHourVaHashMap));
                            }
//                            log.info("map的数据key为： {}", tobjLdHourVaHashMap.keySet());
                            ArrayList<TobjLdHourVa> tobjLdHourVas = new ArrayList<>();
                            tobjLdHourVaHashMap.values().stream().forEach(map -> tobjLdHourVas.addAll(map.values()));
                            List<TobjLdHourVa> ldHourVas = tobjLdHourVas.stream().distinct().collect(Collectors.toList());
//                            List<TobjLdHourVa> tobjLdHourVas = new ArrayList<>(tobjLdHourVaHashMap.values()).stream().distinct().collect(Collectors.toList());
                            log.info("要入库的数据为：{}条", ldHourVas.size());
                            Encoder<TobjLdHourVa> personEncoder = Encoders.bean(TobjLdHourVa.class);
                            personDataset = spark.createDataset(ldHourVas, personEncoder).toDF();
                            personDataset.show();
                        } else if (table.equals(EMPPCURVE) || table.equals(EMPPFCURVE)) {
                            HashMap<Long, HashMap<String, TobjLdHour>> tobjLdHourVaHashMap = new HashMap<>();
                            while (entryIterator.hasNext()) {
                                Map.Entry<String, List<Row>> next = entryIterator.next();
                                List<Row> value = next.getValue();
                                value.forEach(row -> convertResult(table, row, tobjLdHourVaHashMap));
                            }
//                            log.info("map的数据key为： {}", tobjLdHourVaHashMap.keySet());
                            List<TobjLdHour> tobjLdHourVas = new ArrayList<>();
                            tobjLdHourVaHashMap.values().stream().forEach(map -> tobjLdHourVas.addAll(map.values()));
                            List<TobjLdHour> tobjLdHours = tobjLdHourVas.stream().distinct().collect(Collectors.toList());
                            log.info("要入库的数据为：{}条", tobjLdHours.size());
                            Encoder<TobjLdHour> personEncoder = Encoders.bean(TobjLdHour.class);
                            personDataset = spark.createDataset(tobjLdHours, personEncoder).toDF();
                            personDataset.show();
                        }

                        log.info("循环完成之后数据,,,,,删除postgrealf 数据中………………");

                        //入库之前删除之前的记录
                        int batchSize = 100;
                        StringBuilder idsJoiner = new StringBuilder("");
                        for (int i = 0; i < jsonObjectList.size(); i++) {
                            idsJoiner.append(jsonObjectList.get(i).getString("dev_id")).append(",");
                            // 在达到每 100 个元素时，执行 SQL 语句
                            if (i > 0 && i % batchSize == 0) {
                                jdbcDelete(startDate, endDate, statement, dwsTable, idsJoiner.toString().substring(0, idsJoiner.length() - 1));
                                idsJoiner.delete(0, idsJoiner.length());
                            }

                            // 如果是最后一个元素并且不足 100 个元素时，执行 SQL 语句
                            if (i == jsonObjectList.size() - 1 && i % batchSize != 0) {
                                jdbcDelete(startDate, endDate, statement, dwsTable, idsJoiner.toString().substring(0, idsJoiner.length() - 1));
                                idsJoiner.delete(0, idsJoiner.length());
                            }
                        }
                        log.info("删除完毕! ^^^");
                        //准备入库

                        personDataset.write()
                                .format("jdbc")
                                .option("url", url)
                                .option("dbtable", dwsTable)
                                .option("user", username)
                                .option("password", password)
                                .mode(SaveMode.Append)
                                .save();
                        //释放资源
//                        personDataset.unpersist();
                        log.info("入库完成!!!");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    log.error("循环或者存储时发生错误:{}", e.getMessage());
                }
            }
            pageNum++;
        } while ((pageNum * pageSize) < count);


//        connection.close();
//        JDBCUtils.close(connection, statement, null);
    }

    private static void jdbcDelete(String startDate, String endDate, Statement statement, String dwsTable, String idsJoiner) throws SQLException {
        StringBuilder delteSql = new StringBuilder();
        delteSql.append("delete from ").append(dwsTable).append(" where f_data_collection_time >='")
                .append(DateUtil.format(DateUtil.parse(startDate), "yyyy-MM-dd")).append("' and ")
                .append("f_data_collection_time <= '").append(DateUtil.format(DateUtil.parse(endDate), "yyyy-MM-dd"))
                .append("' and f_measurement_points in (").append(idsJoiner).append(")");
        log.info("执行的删除语句为：{}", delteSql);
        boolean execute = statement.execute(delteSql.toString());
//        log.info(" 删除的结果为： {}",execute);
    }

    /**
     * 组装数据，待更新，目前只有hour_va
     *
     * @param table
     * @param row
     * @param tobjLdHourVaHashMap
     */
    private static void convertResultVa(String table, Row row, HashMap<Long, HashMap<String, TobjLdHourVa>> tobjLdHourVaHashMap) {
        Result result = tableGetResult(row, table);
        Object devId = getRowObject(row, "dev_id");
        Object dataDate = getRowObject(row, "ds");
        for (int i = 0; i < 2359; i++) {
            String columnName = result.getType() + String.format("%04d", i);
            // 先用phaseFlag 进行判断，是那个数据
            Object dataDouble = getRowObject(row, columnName);
            if (ObjectUtil.isNotNull(dataDouble)) {
                String parseDate = DateUtil.parse(dataDate.toString(), "yyyyMMdd").toDateStr() + " " + convertUToTime(columnName);
                HashMap<String, TobjLdHourVa> tobjLdHourVaHashMapOrDefault = tobjLdHourVaHashMap.getOrDefault(Long.parseLong(devId.toString()), null);
                if (ObjectUtil.isNull(tobjLdHourVaHashMapOrDefault)) {
                    tobjLdHourVaHashMapOrDefault = new HashMap<>();
                    tobjLdHourVaHashMap.put(Long.parseLong(devId.toString()), tobjLdHourVaHashMapOrDefault);
                }

                TobjLdHourVa orDefault = tobjLdHourVaHashMapOrDefault.getOrDefault(parseDate, null);
                if (ObjectUtil.isNull(orDefault)) {
                    TobjLdHourVa tobjLdHourVa = new TobjLdHourVa();
                    tobjLdHourVa.setF_measurement_points(Long.valueOf(devId.toString()));
                    tobjLdHourVa.setF_data_collection_time(parseDate);
                    tobjLdHourVa.setF_data_input_time(DateUtil.now());
                    tobjLdHourVa.setF_key_name(result.getKeyName());
                    tobjLdHourVa.setF_delete(0L);
                    tobjHourVaSetAvlite(result, tobjLdHourVa, dataDouble);
                    tobjLdHourVaHashMapOrDefault.put(parseDate, tobjLdHourVa);
                } else {
                    tobjHourVaSetAvlite(result, orDefault, dataDouble);
                    tobjLdHourVaHashMapOrDefault.put(parseDate, orDefault);
                }

            }
        }
    }


    /**
     * 组装数据，待更新，目前只有hour
     *
     * @param table
     * @param row
     * @param tobjLdHourVaHashMap
     */
    private static void convertResult(String table, Row row, HashMap<Long, HashMap<String, TobjLdHour>> tobjLdHourVaHashMap) {
        Result result = tableGetResult(row, table);
        Object devId = getRowObject(row, "dev_id");
        Object dataDate = getRowObject(row, "ds");
        for (int i = 0; i < 2359; i++) {
            String columnName = result.getType() + String.format("%04d", i);
            // 先用phaseFlag 进行判断，是那个数据
            Object dataDouble = getRowObject(row, columnName);
            if (ObjectUtil.isNotNull(dataDouble)) {
                String parseDate = DateUtil.parse(dataDate.toString(), "yyyyMMdd").toDateStr() + " " + convertUToTime(columnName);
                HashMap<String, TobjLdHour> orDefault = tobjLdHourVaHashMap.getOrDefault(Long.parseLong(devId.toString()), null);
                if (ObjectUtil.isNull(orDefault)) {
                    orDefault = new HashMap<>();
                    tobjLdHourVaHashMap.put(Long.parseLong(devId.toString()), orDefault);
                }
                TobjLdHour tobjLdHourVaHashMapOrDefault = orDefault.getOrDefault(parseDate, null);
                if (ObjectUtil.isNull(tobjLdHourVaHashMapOrDefault)) {
                    TobjLdHour tobjLdHourVa = new TobjLdHour();
                    tobjLdHourVa.setF_measurement_points(Long.valueOf(devId.toString()));
                    tobjLdHourVa.setF_data_input_time(DateUtil.now());
                    tobjLdHourVa.setF_data_collection_time(parseDate);
                    tobjLdHourVa.setF_key_name(result.getKeyName());
                    tobjLdHourVa.setF_delete(0L);
                    tobjHourSetAvlite(result, tobjLdHourVa, dataDouble, table);
                    orDefault.put(parseDate, tobjLdHourVa);
                } else {
                    tobjHourSetAvlite(result, tobjLdHourVaHashMapOrDefault, dataDouble, table);
                    orDefault.put(parseDate, tobjLdHourVaHashMapOrDefault);
                }

            }
        }
    }


    /**
     * 将不同数据放入不同实体类中的字段上
     *
     * @param result
     * @param tobjLdHour
     * @param dataDouble
     * @param table
     */
    private static void tobjHourSetAvlite(Result result, TobjLdHour tobjLdHour, Object dataDouble, String table) {
        if (table.equals(EMPPFCURVE)) {
            if (result.getPhaseFlag().equals("0")) {
                tobjLdHour.setData_vs(Double.parseDouble(dataDouble.toString()));
            }
            if (result.getPhaseFlag().equals("1")) {
                tobjLdHour.setData_va(Double.parseDouble(dataDouble.toString()));
            }
            if (result.getPhaseFlag().equals("2")) {
                tobjLdHour.setData_vb(Double.parseDouble(dataDouble.toString()));
            }
            if (result.getPhaseFlag().equals("3")) {
                tobjLdHour.setData_vc(Double.parseDouble(dataDouble.toString()));
            }
        } else if (table.equals(EMPPCURVE)) {
            switch (result.getPhaseFlag()) {
                case S10128E49:
                case S10128E39:
                case S10128E59:
                    tobjLdHour.setData_vs(Double.parseDouble(dataDouble.toString()));
                    break;

                case A10128E49:
                case A10128E39:
                case A10128E59:
                    tobjLdHour.setData_va(Double.parseDouble(dataDouble.toString()));
                    break;
                case B10128E49:
                case B10128E39:
                case B10128E59:
                    tobjLdHour.setData_vb(Double.parseDouble(dataDouble.toString()));
                    break;
                case C10128E49:
                case C10128E39:
                case C10128E59:
                    tobjLdHour.setData_vc(Double.parseDouble(dataDouble.toString()));
                    break;
            }
        }

    }

    /**
     * 将不同数据放入不同实体类中的字段上
     *
     * @param result
     * @param tobjLdHourVa
     * @param dataDouble
     */
    private static void tobjHourVaSetAvlite(Object result, TobjLdHourVa tobjLdHourVa, Object dataDouble) {
        if (result.equals("1")) {
            tobjLdHourVa.setData_va(Double.parseDouble(dataDouble.toString()));
        }
        if (result.equals("2")) {
            tobjLdHourVa.setData_vb(Double.parseDouble(dataDouble.toString()));
        }
        if (result.equals("3")) {
            tobjLdHourVa.setData_vc(Double.parseDouble(dataDouble.toString()));
        }
    }


    /**
     * 将ResultSet转换为List<JSONObject>的方法
     *
     * @param resultSet
     * @return
     * @throws SQLException
     */
    private static List<JSONObject> convertResultSetToList(ResultSet resultSet) throws SQLException {
        List<JSONObject> jsonList = new ArrayList<>();
        while (resultSet.next()) {
            int totalColumns = resultSet.getMetaData().getColumnCount();
            JSONObject obj = new JSONObject();
            for (int i = 1; i <= totalColumns; i++) {
                obj.put(resultSet.getMetaData().getColumnLabel(i), resultSet.getObject(i));
            }
            jsonList.add(obj);
        }
        log.info("拿出的条数为；{}", jsonList.size());
        return jsonList;
    }


    private static Result tableGetResult(Row data, String table) {
        // 在这里需要重新获取 表判断的字段
        String phaseFlag = (String) getRowObject(data, "phase_flag");
        String dataType = (String) getRowObject(data, "data_type");
        if (table.equals(EMPUCURVE)) {
            return new Result(phaseFlag, E215, "u", null);
        } else if (table.equals(EMPPFCURVE)) {
            return new Result(phaseFlag, E89, "c", null);
        } else if (table.equals(EMPICURVE)) {
//            joinPirxd(sqlDataBuild, phaseFlag);
            return new Result(phaseFlag, E25, "i", null);
        } else if (table.equals(EMPPCURVE)) {
            if (dataType != null) {
                if ((dataType.equals(S10128E49) || dataType.equals(A10128E49)) || dataType.equals(B10128E49) || dataType.equals(C10128E49)) {
                    return new Result(dataType, E49, "p", null);
                } else if ((dataType.equals(S10128E59) || dataType.equals(A10128E59)) || dataType.equals(B10128E59) || dataType.equals(C10128E59)) {
                    return new Result(dataType, E59, "p", null);
                } else if ((dataType.equals(S10128E39) || dataType.equals(A10128E39)) || dataType.equals(B10128E39) || dataType.equals(C10128E39)) {
                    return new Result(dataType, E39, "p", null);
                }
            }
        } else {
            return null;
        }
        return null;
    }


    /**
     * 取出数据
     *
     * @param data
     * @param point
     * @return
     */
    private static Object getRowObject(Row data, String point) {
        if (!data.schema().getFieldIndex(point).toList().isEmpty() &&
                ObjectUtil.isNotNull(data.getAs(point))) {
            return data.getAs(point);
        } else {
            return null;
        }
    }

}
